package com.bizmda.bizsip.sink.connector;

import cn.hutool.core.util.IdUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.bizmda.bizsip.common.BizException;
import com.bizmda.bizsip.common.BizResultEnum;
import com.bizmda.bizsip.config.AbstractSinkConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * @author 史正烨
 */
@Slf4j
public class RabbitmqSinkConnector extends AbstractSinkConnector implements ByteProcessInterface {
    private RabbitTemplate rabbitTemplate = null;
    private String routingKey;
    private String exchange;

    @Override
    public void init(AbstractSinkConfig sinkConfig) throws BizException {
        super.init(sinkConfig);
        this.exchange = (String) sinkConfig.getConnectorMap().get("exchange");
        this.routingKey = (String) sinkConfig.getConnectorMap().get("routing-key");
        log.info("初始化RabbitmqSinkConnector:exchange[{}],routing-key[{}]",this.exchange,this.routingKey);
        if (rabbitTemplate == null) {
            rabbitTemplate = SpringUtil.getBean("rabbitTemplate");
        }
    }

    @Override
    public byte[] process(byte[] inBytes) throws BizException {
        String correlationId = IdUtil.objectId();
        CorrelationData correlationData = new CorrelationData(correlationId);
        log.debug("correlationId:{},routeKey:{}", correlationId,this.routingKey);
        Message message = MessageBuilder.withBody(inBytes)
                .setCorrelationId(correlationData.getId()).setExpiration("20000").build();
        Message outMessage = rabbitTemplate.sendAndReceive(this.exchange, this.routingKey, message,correlationData);
        if (outMessage == null) {
            throw new BizException(BizResultEnum.SINK_MQ_TIMEOUT);
        }
        return outMessage.getBody();
    }
}
